/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.sdk.schemas.utils;



import net.bytebuddy.ByteBuddy;

import net.bytebuddy.description.type.TypeDescription;

import net.bytebuddy.dynamic.DynamicType;

import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;

import net.bytebuddy.dynamic.scaffold.InstrumentedType;

import net.bytebuddy.implementation.Implementation;

import net.bytebuddy.implementation.bytecode.ByteCodeAppender;

import net.bytebuddy.implementation.bytecode.ByteCodeAppender.Size;

import net.bytebuddy.implementation.bytecode.StackManipulation;

import net.bytebuddy.implementation.bytecode.member.MethodReturn;

import net.bytebuddy.implementation.bytecode.member.MethodVariableAccess;

import net.bytebuddy.matcher.ElementMatchers;

import com.bff.gaia.unified.sdk.schemas.NoSuchSchemaException;

import com.bff.gaia.unified.sdk.schemas.Schema;

import com.bff.gaia.unified.sdk.schemas.SchemaCoder;

import com.bff.gaia.unified.sdk.schemas.SchemaRegistry;

import com.bff.gaia.unified.sdk.transforms.SerializableFunction;

import com.bff.gaia.unified.sdk.transforms.SerializableFunctions;

import com.bff.gaia.unified.sdk.util.common.ReflectHelpers;

import com.bff.gaia.unified.sdk.values.Row;

import com.bff.gaia.unified.sdk.values.TypeDescriptor;

import com.bff.gaia.unified.vendor.guava.com.google.common.primitives.Primitives;

import com.bff.gaia.unified.sdk.schemas.JavaFieldSchema;



import javax.annotation.Nullable;

import java.io.Serializable;

import java.lang.reflect.InvocationTargetException;

import java.lang.reflect.Type;



/** Helper functions for converting between equivalent schema types. */

public class ConvertHelpers {

  /** Return value after converting a schema. */

  public static class ConvertedSchemaInformation<T> implements Serializable {

    // If the output type is a composite type, this is the schema coder.

    @Nullable

	public final SchemaCoder<T> outputSchemaCoder;

    @Nullable

	public final Schema.FieldType unboxedType;



    public ConvertedSchemaInformation(

		@Nullable SchemaCoder<T> outputSchemaCoder, @Nullable Schema.FieldType unboxedType) {

      this.outputSchemaCoder = outputSchemaCoder;

      this.unboxedType = unboxedType;

    }

  }



  /** Get the coder used for converting from an inputSchema to a given type. */

  public static <T> ConvertedSchemaInformation<T> getConvertedSchemaInformation(

	  Schema inputSchema, TypeDescriptor<T> outputType, SchemaRegistry schemaRegistry) {

    ConvertedSchemaInformation<T> convertedSchema = null;

    boolean toRow = outputType.equals(TypeDescriptor.of(Row.class));

    if (toRow) {

      // If the output is of type Row, then just forward the schema of the input type to the

      // output.

      convertedSchema =

          new ConvertedSchemaInformation<>(

              (SchemaCoder<T>)

                  SchemaCoder.of(

                      inputSchema,

                      SerializableFunctions.identity(),

                      SerializableFunctions.identity()),

              null);

    } else {

      // Otherwise, try to find a schema for the output type in the schema registry.

      Schema outputSchema = null;

      SchemaCoder<T> outputSchemaCoder = null;

      try {

        outputSchema = schemaRegistry.getSchema(outputType);

        outputSchemaCoder =

            SchemaCoder.of(

                outputSchema,

                schemaRegistry.getToRowFunction(outputType),

                schemaRegistry.getFromRowFunction(outputType));

      } catch (NoSuchSchemaException e) {



      }

      Schema.FieldType unboxedType = null;

      // TODO: Properly handle nullable.

      if (outputSchema == null || !outputSchema.assignableToIgnoreNullable(inputSchema)) {

        // The schema is not convertible directly. Attempt to unbox it and see if the schema matches

        // then.

        Schema checkedSchema = inputSchema;

        if (inputSchema.getFieldCount() == 1) {

          unboxedType = inputSchema.getField(0).getType();

          if (unboxedType.getTypeName().isCompositeType()

              && !outputSchema.assignableToIgnoreNullable(unboxedType.getRowSchema())) {

            checkedSchema = unboxedType.getRowSchema();

          } else {

            checkedSchema = null;

          }

        }

        if (checkedSchema != null) {

          throw new RuntimeException(

              "Cannot convert between types that don't have equivalent schemas."

                  + " input schema: "

                  + checkedSchema

                  + " output schema: "

                  + outputSchema);

        }

      }

      convertedSchema = new ConvertedSchemaInformation<T>(outputSchemaCoder, unboxedType);

    }

    return convertedSchema;

  }



  /**

   * Returns a function to convert a Row into a primitive type. This only works when the row schema

   * contains a single field, and that field is convertible to the primitive type.

   */

  @SuppressWarnings("unchecked")

  public static <OutputT> SerializableFunction<?, OutputT> getConvertPrimitive(

	  Schema.FieldType fieldType, TypeDescriptor<?> outputTypeDescriptor) {

    Schema.FieldType expectedFieldType =

        StaticSchemaInference.fieldFromType(outputTypeDescriptor, JavaFieldSchema.JavaFieldTypeSupplier.INSTANCE);

    if (!expectedFieldType.equals(fieldType)) {

      throw new IllegalArgumentException(

          "Element argument type "

              + outputTypeDescriptor

              + " does not work with expected schema field type "

              + fieldType);

    }



    Type expectedInputType = new ByteBuddyUtils.ConvertType(true).convert(outputTypeDescriptor);



    TypeDescriptor<?> outputType = outputTypeDescriptor;

    if (outputType.getRawType().isPrimitive()) {

      // A SerializableFunction can only return an Object type, so if the DoFn parameter is a

      // primitive type, then box it for the return. The return type will be unboxed before being

      // forwarded to the DoFn parameter.

      outputType = TypeDescriptor.of(Primitives.wrap(outputType.getRawType()));

    }



    TypeDescription.Generic genericType =

        TypeDescription.Generic.Builder.parameterizedType(

                SerializableFunction.class, expectedInputType, outputType.getType())

            .build();

    DynamicType.Builder<SerializableFunction> builder =

        (DynamicType.Builder<SerializableFunction>) new ByteBuddy().subclass(genericType);

    try {

      return builder

          .method(ElementMatchers.named("apply"))

          .intercept(new ConvertPrimitiveInstruction(outputType))

          .make()

          .load(ReflectHelpers.findClassLoader(), ClassLoadingStrategy.Default.INJECTION)

          .getLoaded()

          .getDeclaredConstructor()

          .newInstance();

    } catch (InstantiationException

        | IllegalAccessException

        | NoSuchMethodException

        | InvocationTargetException e) {

      throw new RuntimeException(e);

    }

  }



  static class ConvertPrimitiveInstruction implements Implementation {

    private final TypeDescriptor<?> outputFieldType;



    public ConvertPrimitiveInstruction(TypeDescriptor<?> outputFieldType) {

      this.outputFieldType = outputFieldType;

    }



    @Override

    public InstrumentedType prepare(InstrumentedType instrumentedType) {

      return instrumentedType;

    }



    @Override

    public ByteCodeAppender appender(final Target implementationTarget) {

      return (methodVisitor, implementationContext, instrumentedMethod) -> {

        int numLocals = 1 + instrumentedMethod.getParameters().size();



        // Method param is offset 1 (offset 0 is the this parameter).

        StackManipulation readValue = MethodVariableAccess.REFERENCE.loadFrom(1);

        StackManipulation stackManipulation =

            new StackManipulation.Compound(

                new ByteBuddyUtils.ConvertValueForSetter(readValue).convert(outputFieldType),

                MethodReturn.REFERENCE);



        StackManipulation.Size size = stackManipulation.apply(methodVisitor, implementationContext);

        return new Size(size.getMaximalSize(), numLocals);

      };

    }

  }

}